有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Kafka Java SimpleConsumer奇怪的编码

我试图使用Kafka 9中的SimpleConsumer允许用户从时间偏移量重播事件-但是我从Kafka收到的消息采用了非常奇怪的编码:

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
                            ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"

使用Kafkanconsumer可以很好地解析这些消息。下面是我使用SimpleConsumer检索消息的代码:

    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
            log.debug("Found an old offset - skip");
            continue;
        }

        readOffset = messageAndOffset.nextOffset();

        int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
        byte[] data = messageAndOffset.message().payload().array();
        byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
        log.debug("Read " + new String(realData, "UTF-8"));
}

在我不断收到UTF-32关于字节太高的错误后,我添加了跳过第一个x字节的代码,我认为这是因为Kafka在有效负载中预先添加了消息大小等信息。这是阿夫罗神器吗


共 (3) 个答案

  1. # 1 楼答案

    您可以使用消息的时间戳(可能不是每次提交)定期记录正在提交的分区偏移量,然后您可以在将来采取一些措施来设置使用者偏移量。我想这是为了生产调试

    我怀疑他们是否会添加这样的功能,考虑到卡夫卡的工作原理,这似乎是不可行的,尽管我可能弄错了,总有天才的东西在发生。我会做记录的事情

  2. # 2 楼答案

    我从来没有找到一个很好的答案,但是我转而使用SimpleConsumer来查询Kafka中我需要的偏移量(每个分区…尽管实现很差),然后使用本机KafkaConsumer使用seek(TopicPartition, offset)seekToBeginning(TopicPartition)来检索消息。希望他们能在下一个版本中向本机客户端添加从给定时间戳检索消息的能力

  3. # 3 楼答案

    你在找这个吗

    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = messageAndOffset.message().payload();
    
        if(payload == null) {
            System.err.println("Message is null : " + readOffset);
            continue;
        }
    
    final byte[] realData = new byte[payload.limit()];
    payload.get(realData);
    System.out.println("Read " + new String(realData, "UTF-8"));